package com.cloudansys.core.flink.function;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.NumUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class WEProcessor extends ProcessFunction<List<MultiDataEntity>, List<MultiDataEntity>> {

    private static Jedis jedis;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
    }

    /**
     * 根据实时风向和设置的叶片角度实时计算【风/叶夹角】
     */
    @Override
    public void processElement(List<MultiDataEntity> elements, Context ctx, Collector<List<MultiDataEntity>> out) throws Exception {
        Double bladeAngle = getBladeAngle();
        List<MultiDataEntity> resList = new ArrayList<>();
        // 获取设置的叶片角度
        for (MultiDataEntity element : elements) {
            Double[] values = element.getValues();
            // 把第三个指标设为风/叶夹角
            log.info("====【bladeAngle: {}】====", bladeAngle);
            values[2] = computeBladeAngle(values[1], bladeAngle);
            element.setValues(values);
            resList.add(element);
        }
        out.collect(resList);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
    }

    /**
     * 获取设置的叶片角度
     */
    private Double getBladeAngle () {
        double bladeAngle = 0.0;
        String redisValue = jedis.get(Const.ANGLE_BLADE);
        if (redisValue != null) {
            bladeAngle = Double.parseDouble(redisValue);
        } else {
            log.error("叶片角度获取失败！");
        }
        return bladeAngle;
    }

    /**
     * 根据实时风向(wd)和叶片角度(ba)实时计算【风/叶夹角】
     */
    private Double computeBladeAngle (double wd, double ba) {
        double bladeAngle;
        double diff = Math.abs(wd - ba);
        if (diff < 180) {
            if (diff > 90) {
                bladeAngle = 180 - diff;
            } else {
                bladeAngle = diff;
            }
        } else {
            if (diff <= 270) {
                bladeAngle = diff - 180;
            } else {
                bladeAngle = 360 - diff;
            }
        }
        return NumUtil.formatDouble(bladeAngle);
    }

}
